In [167]:
import tensorflow as tf
In [168]:
%load_ext version_information
%version_information numpy, scipy, matplotlib, pandas, tensorflow, sklearn, skflow
Out[168]:
In [202]:
CLUSTER_SPEC= """
{
'ps' : ['tensorflow0.pipeline.io:8888', 'tensorflow1.pipeline.io:8888'],
'worker' : [ 'tensorflow2.pipeline.io:8888','tensorflow3.pipeline.io:8888'],
}
"""
In [203]:
import ast
cluster_def = ast.literal_eval(CLUSTER_SPEC)
In [204]:
cluster_def
Out[204]:
In [205]:
spec = tf.train.ClusterSpec(cluster_def)
In [206]:
spec.jobs
Out[206]:
In [207]:
for job in spec.jobs:
print(job, spec.job_tasks(job))
In [208]:
workers = ['/job:worker/task:{}'.format(i) for i in range(len(cluster_def['worker']))]
param_servers = ['/job:ps/task:{}'.format(i) for i in range(len(cluster_def['ps']))]
In [209]:
workers
Out[209]:
In [210]:
param_servers
Out[210]:
In [211]:
l = tf.Variable("local_cpu")
l.device
Out[211]:
We can enforce the assigned device using the tf.device context.
In [212]:
for ps in param_servers:
with tf.device(ps):
v = tf.Variable("my_var")
v.device
Out[212]:
In [213]:
def launch_worker(job_name, task_id, cluster_def):
server = tf.train.Server(
cluster_def,
job_name=job_name,
task_index=task_id
)
server.join()
to connect to any server you can specify the 'target' of the session,direct ip:port of the server when creating a Session object.
Note that the server is generic and can assume either the role of parameter server or of worker.The Cluster configuration decides the role.
The best practice is to create a single Image launching the tensorflow worker.
Environment variables then specify the exact role for the worker at run time.
gRPC Is a Remote Procedure Call protocol based on Protocol Buffers.
Each object in tensorflow that has to be sent over the wire has a gRPC definition.
Example of a gRPC declaration for the Variable
syntax = "proto3";
package tensorflow;
// Protocol buffer representing a Variable.
message VariableDef {
// Name of the variable tensor.
string variable_name = 1;
// Name of the initializer op.
string initializer_name = 2;
// Name of the snapshot tensor.
string snapshot_name = 3;
}
Each variable can then be serialized using the to_proto method:
In [214]:
v.to_proto()
Out[214]:
In [236]:
batch_size = 1000
graph = tf.Graph()
with graph.as_default():
with tf.device('/job:ps/task:0'):
input_array = tf.placeholder(tf.int32, shape=[None])
final_result = tf.Variable(0)
# divide the input across the cluster:
all_reduce = []
splitted = tf.split(0, len(workers), input_array)
for idx, (portion, worker) in enumerate(zip(splitted,workers)):
with tf.device(worker):
print(worker)
local_reduce = tf.reduce_sum(portion)
local_reduce = tf.Print(portion, [local_reduce], message="portion is")
all_reduce.append(local_reduce)
final_result = tf.reduce_sum(tf.pack(all_reduce))
In [237]:
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=True)
We can now run the graph
In [240]:
import numpy as np
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
with tf.Session("grpc://tensorflow3.pipeline.io:8888", graph=graph, config=sess_config) as session:
result = session.run(final_result, feed_dict={ input_array: np.ones([1000]) }, options=run_options)
print(result)
We can also inspect any remote variable:
In [243]:
final_result.device
Out[243]:
In [242]:
with tf.Session("grpc://tensorflow3.pipeline.io:8888", graph=graph, config=sess_config) as session:
result = session.run(local_reduce, feed_dict={ input_array: np.ones([1000]) }, options=run_options)
print(result)
In [ ]: